package com.atguigu.flink.chapter11.function;

import com.atguigu.flink.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/12/22 10:19
 */
public class Flink04_TableAggregate {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);
        
        DataStreamSource<WaterSensor> waterSensorStream =
            env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                             new WaterSensor("sensor_1", 2000L, 20),
                             new WaterSensor("sensor_1", 4000L, 40),
                             new WaterSensor("sensor_1", 5000L, 50),
                             new WaterSensor("sensor_2", 3000L, 30),
                             new WaterSensor("sensor_2", 6000L, 60));
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        Table table = tEnv.fromDataStream(waterSensorStream);
        tEnv.createTemporaryView("sensor", table);
        // 1.在table API中使用
        // 1.1 内联的方式使用
        table
            .groupBy($("id"))
            .flatAggregate(call(Top2.class, $("vc")))
            .select($("id"), $("rank"), $("value"))
            .execute()
            .print();
        
        // 1.2 函数先注册, 再使用
        
        //
        // 目前sql中无法直接使用
        
    }
    
    public static class Top2 extends TableAggregateFunction<Result, FirstSecond> {
        
        // 创建累加器
        @Override
        public FirstSecond createAccumulator() {
            return new FirstSecond();
        }
        
        // 对数据进行累加器
        //
        public void accumulate(FirstSecond acc, Integer vc) {
            if (vc > acc.first) {
                acc.second = acc.first;
                acc.first = vc;
            } else if (vc > acc.second) {
                acc.second = vc;
            }
        }
        
        // 第一个参数必须是累加器
        // 就是输出每行的元素
        public void emitValue(FirstSecond acc, Collector<Result> out) {
            out.collect(new Result("第一", acc.first));
            if (acc.second > 0) {
                out.collect(new Result("第二", acc.second));
            }
        }
    }
    
    public static class Result {
        public String rank;
        public Integer value;
        
        public Result(String rank, Integer value) {
            this.rank = rank;
            this.value = value;
        }
        
    }
    
    public static class FirstSecond {
        public Integer first = 0;
        public Integer second = 0;
        
    }
}
/*
每来一条数据, 输出水位的top2
new WaterSensor("sensor_1", 1000L, 10),
    第一  10
    
new WaterSensor("sensor_1", 2000L, 20)
    第一  20
    第二  10
  new WaterSensor("sensor_1", 4000L, 40),
    第一 40
    第二 20


 */